#!/usr/bin/python3

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.

__author__ = "Bastien Nocera"
__email__ = "hadess@hadess.net"
__copyright__ = "(c) 2019, 2021 Red Hat Inc."
__license__ = "LGPL 3+"

import unittest
import sys
import subprocess
import fcntl
import os

import taptestrunner

try:
    # Do all non-standard imports here so we can skip the tests if any
    # needed packages are not available.
    import dbus
    import dbus.mainloop.glib
    import dbusmock
    from gi.repository import GLib
    from gi.repository import Gio

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    class TestPowerProfileMonitor(dbusmock.DBusTestCase):
        """Test GPowerProfileMonitorDBus"""

        @classmethod
        def setUpClass(klass):
            klass.start_system_bus()
            klass.dbus_con = klass.get_dbus(True)

        def setUp(self):
            try:
                Gio.PowerProfileMonitor
            except AttributeError:
                raise unittest.SkipTest(
                    "Power Profile Monitor not in "
                    "introspection data. Requires "
                    "GObject-Introspection ≥ 1.63.2"
                )  # FIXME version
            try:
                (self.p_mock, self.obj_ppd) = self.spawn_server_template(
                    "upower_power_profiles_daemon", {}, stdout=subprocess.PIPE
                )
            except ModuleNotFoundError:
                raise unittest.SkipTest(
                    "power-profiles-daemon dbusmock template not "
                    "found. Requires dbusmock > 0.31.1."
                )  # FIXME version
            # set log to nonblocking
            flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
            fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            self.power_saver_enabled = False
            self.dbus_props = dbus.Interface(self.obj_ppd, dbus.PROPERTIES_IFACE)
            self.power_profile_monitor = Gio.PowerProfileMonitor.dup_default()
            assert "GPowerProfileMonitorDBus" in str(self.power_profile_monitor)
            self.power_profile_monitor.connect(
                "notify::power-saver-enabled", self.power_saver_enabled_cb
            )
            self.mainloop = GLib.MainLoop()
            self.main_context = self.mainloop.get_context()

        def tearDown(self):
            self.p_mock.terminate()
            self.p_mock.wait()

        def assertEventually(self, condition, message=None, timeout=5):
            """Assert that condition function eventually returns True.

            Timeout is in seconds, defaulting to 5 seconds. message is
            printed on failure.
            """
            if not message:
                message = "timed out waiting for " + str(condition)

            def timed_out_cb(message):
                self.fail(message)
                return GLib.SOURCE_REMOVE

            timeout_source = GLib.timeout_source_new_seconds(timeout)
            timeout_source.set_callback(timed_out_cb, message)
            timeout_source.attach(self.main_context)

            while not condition():
                self.main_context.iteration(True)

            timeout_source.destroy()

        def power_saver_enabled_cb(self, spec, data):
            self.power_saver_enabled = (
                self.power_profile_monitor.get_power_saver_enabled()
            )
            self.main_context.wakeup()

        def test_power_profile_power_saver_enabled(self):
            """power-saver-enabled property"""

            self.assertEqual(
                self.power_profile_monitor.get_power_saver_enabled(), False
            )
            self.dbus_props.Set(
                "org.freedesktop.UPower.PowerProfiles",
                "ActiveProfile",
                dbus.String("power-saver", variant_level=1),
            )
            self.assertEventually(
                lambda: self.power_saver_enabled == True,
                "power-saver didn't become enabled",
                1,
            )

            self.dbus_props.Set(
                "org.freedesktop.UPower.PowerProfiles",
                "ActiveProfile",
                dbus.String("balanced", variant_level=1),
            )
            self.assertEventually(
                lambda: self.power_saver_enabled == False,
                "power-saver didn't become disabled",
                1,
            )

except ImportError as e:

    @unittest.skip("Cannot import %s" % e.name)
    class TestPowerProfileMonitor(unittest.TestCase):
        def test_power_profile_power_saver_enabled(self):
            pass


if __name__ == "__main__":
    unittest.main(testRunner=taptestrunner.TAPTestRunner())
